K-Means ClusteringΒΆ

Problem Statement
The input data contains samples of cars and technical / price
information about them. The goal of this problem is to group
these cars into 4 clusters based on their attributes
Techniques used:
1. K-Means Clustering
2. Centering and Scaling
# -*- coding: utf-8 -*-

import os

os.chdir("/home/cloudops/spark")
os.curdir

# =====================================
# Load CSV file into a RDD
# =====================================
autoData = sc.textFile("data/auto-data.csv")
autoData.cache()
# print(autoData)
# auto-data.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

# =====================================
# Remove the first line (contains headers)
firstLine = autoData.first()
dataLines = autoData.filter(lambda x: x != firstLine)
dataLines.count()    # 197

# =====================================
from pyspark.mllib.linalg import Vectors

# Convert to Local Vector
def transformToNumeric( inputStr) :

    attList = inputStr.split(",")

    doors = 1.0 if attList[3] =="two" else 2.0
    body = 1.0 if attList[4] == "sedan" else 2.0

    # Filter out columns not wanted at this stage
    # Fields: HP, RPM, MPG_CITY
    values= Vectors.dense([doors,
                          float(body),
                          float(attList[7]),
                          float(attList[8]),
                          float(attList[9])
                         ])
    return values

autoVector = dataLines.map(transformToNumeric)
autoVector.persist()
autoVector.collect()
# [DenseVector([1.0, 2.0, 69.0, 4900.0, 31.0]),
#  DenseVector([1.0, 2.0, 48.0, 5100.0, 47.0]),
#  DenseVector([1.0, 2.0, 68.0, 5000.0, 30.0]),
#  DenseVector([1.0, 2.0, 62.0, 4800.0, 35.0]),
# . . .

# =====================================
# Centering and Scaling
# =====================================
# Normalize the ranges of attributes (10, 100, 1000)
# Use Centering and scaling for that:
# Every value should be
# * subtracted from that column's mean and
# * divided by its Std. Deviation.

# Perform statistical Analysis and compute
# mean and Std.Dev for every column
from math import sqrt
from pyspark.mllib.stat import Statistics

autoStats = Statistics.colStats(autoVector)

# Mean
colMeans = autoStats.mean()
# [1.56852792e+00 1.53299492e+00 1.03604061e+02 5.11802030e+03  2.51522843e+01]

# Standard Deviation
colVariance = autoStats.variance()
# [2.46555475e-01 2.50181291e-01 1.41670978e+03 2.31395551e+05 4.14460789e+01]
colStdDev = list(map(lambda x: sqrt(x), colVariance))

# Place the means and Std.Dev values in a broadcast variable
bcMeans = sc.broadcast(colMeans)
bcStdDev = sc.broadcast(colStdDev)

def centerAndScale(inVector) :
    global bcMeans
    global bcStdDev

    meanArray = bcMeans.value
    stdDevArray = bcStdDev.value

    valueArray = inVector.toArray()
    retArray = []
    for i in range(valueArray.size):
        retArray.append((valueArray[i] - meanArray[i]) / stdDevArray[i])
    return Vectors.dense(retArray)

csAuto = autoVector.map(centerAndScale)
csAuto.collect()     # PythonRDD
# . . .
# DenseVector([-1.145, 0.9337, 2.747, 1.6256, -1.2663]),
# DenseVector([0.869, -1.0656, 2.136, -1.2848, -1.7323]),
# DenseVector([-1.145, -1.0656, 2.0828, 0.5862, -1.4216]),
# DenseVector([-1.145, 0.9337, 2.136, -1.2848, -1.7323])]

# So, all values are normalized - in range [-2...2]

# =====================================
# Create a Data Frame with all features
# =====================================
from pyspark.sql import SQLContext, Row

sqlContext = SQLContext(sc)

autoRows = csAuto.map(lambda f: Row(features=f))
autoDf = sqlContext.createDataFrame(autoRows)   # DataFrame[features: vector]

autoDf.select("features").show(5)
# +--------------------+
# |            features|
# +--------------------+
# |[-1.1449709581789...|
# |[-1.1449709581789...|
# |[-1.1449709581789...|
# . . .

# =====================================
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.mllib.util import MLUtils

# k=3 <- 3 clusters
kmeans = KMeans(k=3, seed=1)
model = kmeans.fit(autoDf)
# ERROR: IllegalArgumentException: 'requirement failed:
# Column features must be of type equal to one of the following types:
# [struct<type:tinyint,size:int,indices:array<int>,values:array<double>>, array<double>, array<float>]
# but was actually of type
# struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.'

# Trains a k-means model
# kmeans = KMeans().setK(3).setSeed(1)
# model = kmeans.fit(autoDf)

predictions = model.transform(autoDf)
predictions.collect()

# =====================================
# Evaluate clustering by computing Silhouette score
# evaluator = ClusteringEvaluator()
# silhouette = evaluator.evaluate(predictions)
# print("Silhouette with squared euclidean distance = " + str(silhouette))

# =====================================
# Plot the results in a scatter plot
# =====================================
import pandas as pd

# Convert to Pandas DataFrame
def unstripData(instr) :
    return (instr["prediction"],
            instr["features"][0],
            instr["features"][1],
            instr["features"][2],
            instr["features"][3])

unstripped = predictions.map(unstripData)
predList = unstripped.collect()

predPd = pd.DataFrame(predList)

# =====================================
import matplotlib.pylab as plt
plt.cla()
plt.scatter(predPd[3], predPd[4], c=predPd[0])